/*
 * Licensed to the Apache Software Foundation (ASF) under one or more
 * contributor license agreements.  See the NOTICE file distributed with
 * this work for additional information regarding copyright ownership.
 * The ASF licenses this file to You under the Apache License, Version 2.0
 * (the "License"); you may not use this file except in compliance with
 * the License.  You may obtain a copy of the License at
 *
 *      http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */

package org.apache.ignite.internal.processors.cache.distributed;

import java.util.ArrayList;
import java.util.Collection;
import java.util.ConcurrentModificationException;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.TreeMap;
import java.util.UUID;
import java.util.concurrent.Callable;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.ThreadLocalRandom;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import org.apache.ignite.Ignite;
import org.apache.ignite.IgniteCache;
import org.apache.ignite.IgniteSystemProperties;
import org.apache.ignite.cache.CacheAtomicityMode;
import org.apache.ignite.cache.CacheMode;
import org.apache.ignite.cluster.ClusterNode;
import org.apache.ignite.cluster.ClusterTopologyException;
import org.apache.ignite.configuration.CacheConfiguration;
import org.apache.ignite.configuration.IgniteConfiguration;
import org.apache.ignite.events.DiscoveryEvent;
import org.apache.ignite.events.Event;
import org.apache.ignite.events.EventType;
import org.apache.ignite.internal.IgniteEx;
import org.apache.ignite.internal.IgniteInternalFuture;
import org.apache.ignite.internal.IgniteKernal;
import org.apache.ignite.internal.IgniteNodeAttributes;
import org.apache.ignite.internal.IgnitionEx;
import org.apache.ignite.internal.TestDelayingCommunicationSpi;
import org.apache.ignite.internal.TestRecordingCommunicationSpi;
import org.apache.ignite.internal.managers.communication.GridIoMessage;
import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion;
import org.apache.ignite.internal.processors.cache.CacheGroupContext;
import org.apache.ignite.internal.processors.cache.ExchangeDiscoveryEvents;
import org.apache.ignite.internal.processors.cache.GridCachePartitionExchangeManager;
import org.apache.ignite.internal.processors.cache.IgniteInternalCache;
import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.GridDhtPartitionDemandMessage;
import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.GridDhtPartitionsAbstractMessage;
import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.GridDhtPartitionsExchangeFuture;
import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.GridDhtPartitionsFullMessage;
import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.GridDhtPartitionsSingleRequest;
import org.apache.ignite.internal.util.GridConcurrentHashSet;
import org.apache.ignite.internal.util.future.GridCompoundFuture;
import org.apache.ignite.internal.util.typedef.F;
import org.apache.ignite.internal.util.typedef.G;
import org.apache.ignite.internal.util.typedef.PA;
import org.apache.ignite.lang.IgniteBiPredicate;
import org.apache.ignite.lang.IgniteClosure;
import org.apache.ignite.lang.IgnitePredicate;
import org.apache.ignite.plugin.extensions.communication.Message;
import org.apache.ignite.testframework.GridTestUtils;
import org.apache.ignite.testframework.ListeningTestLogger;
import org.apache.ignite.testframework.LogListener;
import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest;
import org.apache.ignite.thread.IgniteThreadFactory;
import org.apache.ignite.transactions.Transaction;
import org.apache.ignite.transactions.TransactionConcurrency;
import org.apache.ignite.transactions.TransactionIsolation;
import org.junit.Assert;
import org.junit.Ignore;
import org.junit.Test;

import static java.util.Objects.nonNull;
import static org.apache.ignite.IgniteSystemProperties.IGNITE_EXCHANGE_HISTORY_SIZE;
import static org.apache.ignite.cache.CacheAtomicityMode.ATOMIC;
import static org.apache.ignite.cache.CacheAtomicityMode.TRANSACTIONAL;
import static org.apache.ignite.cache.CacheMode.PARTITIONED;
import static org.apache.ignite.cache.CacheMode.REPLICATED;
import static org.apache.ignite.cache.CacheWriteSynchronizationMode.FULL_SYNC;
import static org.apache.ignite.internal.events.DiscoveryCustomEvent.EVT_DISCOVERY_CUSTOM_EVT;
import static org.apache.ignite.testframework.GridTestUtils.mergeExchangeWaitVersion;
import static org.apache.ignite.testframework.GridTestUtils.runAsync;
import static org.apache.ignite.testframework.LogListener.matches;

/**
 *
 */
public class CacheExchangeMergeTest extends GridCommonAbstractTest {
    /** */
    private static final long WAIT_SECONDS = 45;

    /** */
    private boolean testSpi;

    /** */
    private boolean testDelaySpi;

    /** */
    private static String[] cacheNames = {"c1", "c2", "c3", "c4", "c5", "c6", "c7", "c8", "c9", "c10"};

    /** */
    private boolean cfgCache = true;

    /** */
    private IgniteClosure<String, Boolean> clientC;

    /** */
    private static ExecutorService executor;

    /** Logger for listen messages. */
    private final ListeningTestLogger listeningLog = new ListeningTestLogger(log);

    /** {@inheritDoc} */
    @Override protected IgniteConfiguration getConfiguration(String igniteInstanceName) throws Exception {
        IgniteConfiguration cfg = super.getConfiguration(igniteInstanceName).setGridLogger(listeningLog);

        if (testSpi)
            cfg.setCommunicationSpi(new TestRecordingCommunicationSpi());
        else if (testDelaySpi)
            cfg.setCommunicationSpi(new TestDelayExchangeMessagesSpi());

        if (clientC != null)
            cfg.setClientMode(clientC.apply(igniteInstanceName));

        if (cfgCache) {
            cfg.setCacheConfiguration(
                cacheConfiguration("c1", ATOMIC, PARTITIONED, 0),
                cacheConfiguration("c2", ATOMIC, PARTITIONED, 1),
                cacheConfiguration("c3", ATOMIC, PARTITIONED, 2),
                cacheConfiguration("c4", ATOMIC, PARTITIONED, 10),
                cacheConfiguration("c5", ATOMIC, REPLICATED, 0),
                cacheConfiguration("c6", TRANSACTIONAL, PARTITIONED, 0),
                cacheConfiguration("c7", TRANSACTIONAL, PARTITIONED, 1),
                cacheConfiguration("c8", TRANSACTIONAL, PARTITIONED, 2),
                cacheConfiguration("c9", TRANSACTIONAL, PARTITIONED, 10),
                cacheConfiguration("c10", TRANSACTIONAL, REPLICATED, 0),
                cacheConfiguration("c11", ATOMIC, PARTITIONED, 0),
                cacheConfiguration("c12", ATOMIC, PARTITIONED, 1),
                cacheConfiguration("c13", ATOMIC, PARTITIONED, 2),
                cacheConfiguration("c14", ATOMIC, PARTITIONED, 10),
                cacheConfiguration("c15", ATOMIC, REPLICATED, 0),
                cacheConfiguration("c16", TRANSACTIONAL, PARTITIONED, 0),
                cacheConfiguration("c17", TRANSACTIONAL, PARTITIONED, 1),
                cacheConfiguration("c18", TRANSACTIONAL, PARTITIONED, 2),
                cacheConfiguration("c19", TRANSACTIONAL, PARTITIONED, 10),
                cacheConfiguration("c20", TRANSACTIONAL, REPLICATED, 0),
                // There were MVCC caches, but now Ignite does not support them.
                cacheConfiguration("c21", TRANSACTIONAL, PARTITIONED, 0),
                cacheConfiguration("c22", TRANSACTIONAL, PARTITIONED, 1),
                cacheConfiguration("c23", TRANSACTIONAL, PARTITIONED, 2),
                cacheConfiguration("c24", TRANSACTIONAL, PARTITIONED, 10),
                cacheConfiguration("c25", TRANSACTIONAL, REPLICATED, 0)
            );
        }

        return cfg;
    }

    /** {@inheritDoc} */
    @Override protected void beforeTestsStarted() throws Exception {
        super.beforeTestsStarted();

        executor = Executors.newFixedThreadPool(Runtime.getRuntime().availableProcessors(),
            new IgniteThreadFactory("testscope", "cache-exchange-merge-tests"));
    }

    /** {@inheritDoc} */
    @Override protected void afterTestsStopped() throws Exception {
        if (executor != null)
            executor.shutdown();
    }

    /** {@inheritDoc} */
    @Override protected void afterTest() throws Exception {
        listeningLog.clearListeners();

        stopAllGridsNoWait();

        super.afterTest();
    }

    /**
     * @param name Cache name.
     * @param atomicityMode Cache atomicity mode.
     * @param cacheMode Cache mode.
     * @param backups Number of backups.
     * @return Cache configuration.
     */
    private CacheConfiguration cacheConfiguration(String name,
        CacheAtomicityMode atomicityMode,
        CacheMode cacheMode,
        int backups
    ) {
        CacheConfiguration ccfg = new CacheConfiguration(name);

        ccfg.setAtomicityMode(atomicityMode);
        ccfg.setWriteSynchronizationMode(FULL_SYNC);
        ccfg.setCacheMode(cacheMode);

        if (cacheMode == PARTITIONED)
            ccfg.setBackups(backups);

        return ccfg;
    }

    /**
     * @throws Exception If failed.
     */
    @Test
    public void testDelayExchangeMessages() throws Exception {
        testDelaySpi = true;

        System.setProperty(IgniteSystemProperties.IGNITE_EXCHANGE_MERGE_DELAY, "2000");

        try {
            final int srvs = 6;
            final int clients = 3;

            startGridsMultiThreaded(srvs);

            for (int i = 0; i < clients; i++)
                startClientGrid(srvs + i);

            final int initNodes = srvs + clients;

            final AtomicInteger stopIdx = new AtomicInteger();

            IgniteInternalFuture stopFut = GridTestUtils.runMultiThreadedAsync(new Callable<Void>() {
                @Override public Void call() throws Exception {
                    Thread.sleep(ThreadLocalRandom.current().nextLong(500) + 1);

                    stopGrid(stopIdx.incrementAndGet());

                    return null;
                }
            }, 3, "stop-srv");

            final AtomicInteger startIdx = new AtomicInteger(initNodes);

            IgniteInternalFuture startFut = GridTestUtils.runMultiThreadedAsync(new Callable<Void>() {
                @Override public Void call() throws Exception {
                    ThreadLocalRandom rnd = ThreadLocalRandom.current();

                    int nodeIdx = startIdx.incrementAndGet();

                    if (rnd.nextInt(3) == 0) {
                        log.info("Start client: " + nodeIdx);

                        startClientGrid(nodeIdx);
                    }
                    else {
                        log.info("Start server: " + nodeIdx);

                        startGrid(nodeIdx);
                    }

                    if (rnd.nextBoolean()) {
                        log.info("Stop started node: " + nodeIdx);

                        stopGrid(nodeIdx);
                    }

                    return null;
                }
            }, 5, "start-node");

            stopFut.get();
            startFut.get();

            checkCaches();
        }
        finally {
            System.clearProperty(IgniteSystemProperties.IGNITE_EXCHANGE_MERGE_DELAY);
        }
    }

    /**
     * @throws Exception If failed.
     */
    @Test
    public void testMergeStartRandomClientsServers() throws Exception {
        for (int iter = 0; iter < 3; iter++) {
            ThreadLocalRandom rnd = ThreadLocalRandom.current();

            final int srvs = rnd.nextInt(3) + 1;
            final int clients = rnd.nextInt(3);

            log.info("Iteration [iter=" + iter + ", srvs=" + srvs + ", clients=" + clients + ']');

            Ignite srv0 = startGrids(srvs);

            for (int i = 0; i < clients; i++)
                startClientGrid(srvs + i);

            final int threads = 8;

            final int initNodes = srvs + clients;

            mergeExchangeWaitVersion(srv0, initNodes + threads);

            final AtomicInteger idx = new AtomicInteger(initNodes);

            IgniteInternalFuture fut = GridTestUtils.runMultiThreadedAsync(new Callable<Void>() {
                @Override public Void call() throws Exception {
                    ThreadLocalRandom rnd = ThreadLocalRandom.current();

                    int nodeIdx = idx.incrementAndGet();

                    if (rnd.nextInt(3) == 0) {
                        log.info("Start client: " + nodeIdx);

                        startClientGrid(nodeIdx);
                    }
                    else {
                        log.info("Start server: " + nodeIdx);

                        startGrid(nodeIdx);
                    }

                    return null;
                }
            }, threads, "test-thread");

            fut.get();

            checkCaches();

            stopAllGrids();
        }
    }

    /**
     * @throws Exception If failed.
     */
    @Test
    public void testMergeStartStopRandomClientsServers() throws Exception {
        for (int iter = 0; iter < 3; iter++) {
            log.info("Iteration: " + iter);

            final int srvs = 5;
            final int clients = 5;

            Ignite srv0 = startGrids(srvs);

            for (int i = 0; i < clients; i++)
                startClientGrid(srvs + i);

            final int threads = 8;

            final int initNodes = srvs + clients;

            mergeExchangeWaitVersion(srv0, initNodes + threads);

            final AtomicInteger idx = new AtomicInteger(initNodes);

            final GridConcurrentHashSet<Integer> stopNodes = new GridConcurrentHashSet<>();

            IgniteInternalFuture fut = GridTestUtils.runMultiThreadedAsync(new Callable<Void>() {
                @Override public Void call() throws Exception {
                    ThreadLocalRandom rnd = ThreadLocalRandom.current();

                    if (rnd.nextBoolean()) {
                        Integer stopIdx;

                        for (;;) {
                            stopIdx = rnd.nextInt(initNodes - 1) + 1;

                            if (stopNodes.add(stopIdx))
                                break;
                        }

                        log.info("Stop node: " + stopIdx);

                        stopGrid(getTestIgniteInstanceName(stopIdx), true, false);
                    }
                    else {
                        int nodeIdx = idx.incrementAndGet();

                        if (rnd.nextInt(5) == 0) {
                            log.info("Start client: " + nodeIdx);

                            startClientGrid(nodeIdx);
                        }
                        else {
                            log.info("Start server: " + nodeIdx);

                            startGrid(nodeIdx);
                        }
                    }

                    return null;
                }
            }, threads, "test-thread");

            fut.get();

            checkCaches();

            stopAllGrids();
        }
    }

    /**
     * @throws Exception If failed.
     */
    @Test
    public void testConcurrentStartServers() throws Exception {
        concurrentStart(false);
    }

    /**
     * @throws Exception If failed.
     */
    @Test
    public void testConcurrentStartServersAndClients() throws Exception {
        concurrentStart(true);
    }

    /**
     * @param withClients If {@code true} also starts client nodes.
     * @throws Exception If failed.
     */
    private void concurrentStart(final boolean withClients) throws Exception {
        int iterations = GridTestUtils.SF.applyLB(5, 1);

        for (int i = 0; i < iterations; i++) {
            log.info("Iteration: " + i);

            startGrid(0);

            final AtomicInteger idx = new AtomicInteger(1);

            IgniteInternalFuture fut = GridTestUtils.runMultiThreadedAsync(new Callable<Void>() {
                @Override public Void call() throws Exception {
                    int nodeIdx = idx.getAndIncrement();

                    Ignite node;

                    if (withClients && ThreadLocalRandom.current().nextBoolean())
                        node = startClientGrid(nodeIdx);
                    else
                        node = startGrid(nodeIdx);

                    checkNodeCaches(node, nodeIdx * 1000, 1000);

                    return null;
                }
            }, 10, "start-node");

            fut.get();

            checkCaches();

            startGrid(1000);

            checkCaches();

            stopAllGrids();
        }
    }

    /**
     * @throws Exception If failed.
     */
    @Ignore("https://issues.apache.org/jira/browse/IGNITE-10186")
    @Test
    public void testMergeServerAndClientJoin1() throws Exception {
        final IgniteEx srv0 = startGrid(0);

        mergeExchangeWaitVersion(srv0, 3);

        IgniteInternalFuture<?> fut1 = GridTestUtils.runMultiThreadedAsync(new Callable<Void>() {
            @Override public Void call() throws Exception {
                startGrid(1);

                return null;
            }
        }, 1, "start-srv");

        waitForExchangeStart(srv0, 2);

        IgniteInternalFuture<?> fut2 = GridTestUtils.runMultiThreadedAsync(new Callable<Void>() {
            @Override public Void call() throws Exception {
                startClientGrid(2);

                return null;
            }
        }, 1, "start-client");

        fut1.get();
        fut2.get();

        checkCaches();

        checkExchanges(srv0, 1, 3);
        checkExchanges(ignite(1), 3);
        checkExchanges(ignite(2), 3);
    }

    /**
     * @throws Exception If failed.
     */
    @Test
    public void testStartCacheOnJoinAndJoinMerge_2_nodes() throws Exception {
        startCacheOnJoinAndJoinMerge1(2, false);
    }

    /**
     * @throws Exception If failed.
     */
    @Test
    public void testStartCacheOnJoinAndJoinMerge_4_nodes() throws Exception {
        startCacheOnJoinAndJoinMerge1(4, false);
    }

    /**
     * @throws Exception If failed.
     */
    @Test
    public void testStartCacheOnJoinAndJoinMerge_WithClients() throws Exception {
        startCacheOnJoinAndJoinMerge1(5, true);
    }

    /**
     * @param nodes Number of nodes to start.
     * @param withClients If {@code true} starts both servers and clients.
     * @throws Exception If failed.
     */
    private void startCacheOnJoinAndJoinMerge1(int nodes, boolean withClients) throws Exception {
        cfgCache = false;

        final IgniteEx srv0 = startGrid(0);

        mergeExchangeWaitVersion(srv0, nodes + 1);

        if (withClients) {
            clientC = new IgniteClosure<String, Boolean>() {
                @Override public Boolean apply(String nodeName) {
                    return getTestIgniteInstanceIndex(nodeName) % 2 == 0;
                }
            };
        }

        cfgCache = true;

        IgniteInternalFuture fut = startGridsAsync(srv0, 1, nodes);

        fut.get();

        checkCaches();
    }

    /**
     * @throws Exception If failed.
     */
    @Test
    public void testMergeAndHistoryCleanup() throws Exception {
        final int histSize = 5;

        String oldHistVal = System.getProperty(IGNITE_EXCHANGE_HISTORY_SIZE);

        System.setProperty(IGNITE_EXCHANGE_HISTORY_SIZE, String.valueOf(histSize));

        try {
            final Ignite srv0 = startGrid(0);

            int topVer = 1;

            for (int i = 0; i < 3; i++) {
                mergeExchangeWaitVersion(srv0, topVer + 3);

                startGridsAsync(srv0, topVer, 3).get();

                topVer += 3;
            }

            checkHistorySize(histSize);

            awaitPartitionMapExchange();

            checkHistorySize(histSize);

            mergeExchangeWaitVersion(srv0, topVer + 2);

            stopGrid(1);
            stopGrid(2);

            checkHistorySize(histSize);

            awaitPartitionMapExchange();

            checkHistorySize(histSize);
        }
        finally {
            if (oldHistVal != null)
                System.setProperty(IGNITE_EXCHANGE_HISTORY_SIZE, oldHistVal);
            else
                System.clearProperty(IGNITE_EXCHANGE_HISTORY_SIZE);
        }
    }

    /**
     * @param histSize History size.
     */
    private void checkHistorySize(int histSize) {
        List<Ignite> nodes = G.allGrids();

        assertTrue(!nodes.isEmpty());

        for (Ignite node : nodes) {
            List<GridDhtPartitionsExchangeFuture> exchFuts =
                ((IgniteEx)node).context().cache().context().exchange().exchangeFutures();

            assertTrue("Unexpected size: " + exchFuts.size(), !exchFuts.isEmpty() && exchFuts.size() <= histSize);
        }
    }

    /**
     * @throws Exception If failed.
     */
    @Test
    public void testStartCacheOnJoinAndMergeWithFail() throws Exception {
        cfgCache = false;

        final Ignite srv0 = startGrids(2);

        mergeExchangeWaitVersion(srv0, 5);

        cfgCache = true;

        IgniteInternalFuture fut = startGridsAsync(srv0, 2, 2);

        stopGrid(1);

        fut.get();

        checkCaches();

        checkExchanges(srv0, 1, 2, 3, 5);
        checkExchanges(ignite(2), 3, 5);
        checkExchanges(ignite(3), 5);
    }

    /**
     * @throws Exception If failed.
     */
    @Test
    public void testStartCacheOnJoinAndCoordinatorFailed1() throws Exception {
        cfgCache = false;

        final Ignite srv0 = startGrids(2);

        mergeExchangeWaitVersion(srv0, 5);

        cfgCache = true;

        IgniteInternalFuture fut = startGridsAsync(srv0, 2, 2);

        stopGrid(0);

        fut.get();

        checkCaches();
    }

    /**
     * @throws Exception If failed.
     */
    @Test
    public void testStartCacheOnJoinAndCoordinatorFailed2() throws Exception {
        cfgCache = false;

        final Ignite srv0 = startGrid(0);

        mergeExchangeWaitVersion(srv0, 3);

        cfgCache = true;

        IgniteInternalFuture fut = startGridsAsync(srv0, 1, 2);

        stopGrid(0);

        fut.get();

        checkCaches();
    }

    /**
     * @throws Exception If failed.
     */
    @Test
    public void testMergeServersJoin1() throws Exception {
        IgniteEx srv0 = startGrid(0);

        mergeExchangeWaitVersion(srv0, 3);

        final AtomicInteger idx = new AtomicInteger(1);

        IgniteInternalFuture<?> fut = GridTestUtils.runMultiThreadedAsync(new Callable<Void>() {
            @Override public Void call() throws Exception {
                startGrid(idx.getAndIncrement());

                return null;
            }
        }, 2, "start-node");

        fut.get();

        checkCaches();

        checkExchanges(srv0, 1, 3);
        checkExchanges(ignite(1), 3);
        checkExchanges(ignite(2), 3);
    }

    /**
     * @throws Exception If failed.
     */
    @Test
    public void testMergeServerJoin1ClientsInTopology() throws Exception {
        IgniteEx srv0 = startGrid(0);

        startClientGrid(1);
        startClientGrid(2);

        mergeExchangeWaitVersion(srv0, 5);

        final AtomicInteger idx = new AtomicInteger(3);

        IgniteInternalFuture<?> fut = GridTestUtils.runMultiThreadedAsync(new Callable<Void>() {
            @Override public Void call() throws Exception {
                startGrid(idx.getAndIncrement());

                return null;
            }
        }, 2, "start-node");

        fut.get();

        checkCaches();

        checkExchanges(srv0, 1, 2, 3, 5);
        checkExchanges(ignite(1), 2, 3, 5);
        checkExchanges(ignite(2), 3, 5);
        checkExchanges(ignite(3), 5);
        checkExchanges(ignite(4), 5);
    }

    /**
     * @throws Exception If failed.
     */
    @Test
    public void testMergeAndNewCoordinator() throws Exception {
        final Ignite srv0 = startGrids(3);

        mergeExchangeWaitVersion(srv0, 6);

        IgniteInternalFuture fut = startGridsAsync(srv0, 3, 3);

        fut.get();

        checkCaches();

        stopGrid(0);

        checkCaches();
    }

    /**
     * @throws Exception If failed.
     */
    @Test
    public void testMergeServersFail1_1() throws Exception {
        mergeServersFail1(false, false, 8);
    }

    /**
     * @throws Exception If failed.
     */
    @Test
    public void testMergeServersFail1_2() throws Exception {
        mergeServersFail1(true, false, 8);
    }

    /**
     * @throws Exception If failed.
     */
    @Test
    public void testMergeServersFail1_3() throws Exception {
        mergeServersFail1(false, true, 8);
    }

    /**
     * @throws Exception If failed.
     */
    @Test
    public void testMergeServersFail1_4() throws Exception {
        mergeServersFail1(true, true, 8);
    }

    /**
     * @throws Exception If failed.
     */
    @Test
    public void testMergeServersFail1_5() throws Exception {
        mergeServersFail1(false, false, 7);
    }

    /**
     * @throws Exception If failed.
     */
    @Test
    public void testMergeServersFail1_6() throws Exception {
        mergeServersFail1(true, false, 7);
    }

    /**
     * @throws Exception If failed.
     */
    @Test
    public void testMergeServersFail1_7() throws Exception {
        mergeServersFail1(false, true, 7);
    }

    /**
     * @throws Exception If failed.
     */
    @Test
    public void testMergeServersFail1_8() throws Exception {
        mergeServersFail1(true, true, 7);
    }

    /**
     * @param waitRebalance Wait for rebalance end before start tested topology change.
     * @param delayRebalance Delay rebalancing before checking caches.
     * @param mergeTopVer Merge topology version (7 or 8).
     * @throws Exception If failed.
     */
    private void mergeServersFail1(boolean waitRebalance, boolean delayRebalance, int mergeTopVer) throws Exception {
        testSpi = true;

        final IgniteEx srv0 = startGrids(5);

        if (waitRebalance)
            awaitPartitionMapExchange();

        if (delayRebalance) {
            for (Ignite allGrid : G.allGrids()) {
                TestRecordingCommunicationSpi.spi(allGrid).blockMessages(new IgniteBiPredicate<ClusterNode, Message>() {
                    @Override public boolean apply(ClusterNode clusterNode, Message msg) {
                        return msg instanceof GridDhtPartitionDemandMessage;
                    }
                });
            }
        }

        final List<DiscoveryEvent> mergedEvts = new ArrayList<>();

        mergeExchangeWaitVersion(srv0, mergeTopVer, mergedEvts);

        UUID grid3Id = grid(3).localNode().id();
        UUID grid2Id = grid(2).localNode().id();

        stopGrid(getTestIgniteInstanceName(4), true, false);
        stopGrid(getTestIgniteInstanceName(3), true, false);

        if (mergeTopVer == 7) {
            waitForReadyTopology(grid(0).cachex(cacheNames[0]).context().topology(),
                new AffinityTopologyVersion(7, 0));
        }

        stopGrid(getTestIgniteInstanceName(2), true, false);

        checkAffinity();

        checkCaches0();

        checkAffinity();

        if (delayRebalance) {
            for (Ignite allGrid : G.allGrids())
                TestRecordingCommunicationSpi.spi(allGrid).stopBlock();
        }

        awaitPartitionMapExchange();

        checkTopologiesConsistency();

        checkCaches0();

        assertTrue("Unexpected number of merged disco events: " + mergedEvts.size(),
            mergedEvts.size() == mergeTopVer - 6);

        for (DiscoveryEvent discoEvt : mergedEvts) {
            ClusterNode evtNode = discoEvt.eventNode();

            assertTrue("eventNode is null for DiscoEvent " + discoEvt, evtNode != null);

            assertTrue("Unexpected eventNode ID: "
                    + evtNode.id() + " while expecting " + grid2Id + " or " + grid3Id,
                evtNode.id().equals(grid2Id) || evtNode.id().equals(grid3Id));
        }
    }

    /**
     * @throws Exception If failed.
     */
    @Test
    public void testMergeServersAndClientsFail1() throws Exception {
        mergeServersAndClientsFail(false);
    }

    /**
     * @throws Exception If failed.
     */
    @Test
    public void testMergeServersAndClientsFail2() throws Exception {
        mergeServersAndClientsFail(true);
    }

    /**
     * @param waitRebalance Wait for rebalance end before start tested topology change.
     * @throws Exception If failed.
     */
    private void mergeServersAndClientsFail(boolean waitRebalance) throws Exception {
        clientC = new IgniteClosure<String, Boolean>() {
            @Override public Boolean apply(String nodeName) {
                return nodeName.equals(getTestIgniteInstanceName(2)) || nodeName.equals(getTestIgniteInstanceName(3));
            }
        };

        final Ignite srv0 = startGrids(6);

        if (waitRebalance)
            awaitPartitionMapExchange();

        mergeExchangeWaitVersion(srv0, 10);

        stopGrid(getTestIgniteInstanceName(1), true, false);
        stopGrid(getTestIgniteInstanceName(2), true, false);
        stopGrid(getTestIgniteInstanceName(3), true, false);
        stopGrid(getTestIgniteInstanceName(4), true, false);

        checkAffinity();

        mergeExchangeWaitVersion(srv0, 12);

        IgniteInternalFuture fut = startGridsAsync(srv0, 6, 2);

        fut.get();

        checkCaches();
    }

    /**
     * @throws Exception If failed.
     */
    @Test
    public void testJoinExchangeCoordinatorChange_NoMerge_1() throws Exception {
        for (CoordinatorChangeMode mode : CoordinatorChangeMode.values()) {
            exchangeCoordinatorChangeNoMerge(4, true, mode);

            stopAllGrids();
        }
    }

    /**
     * @throws Exception If failed.
     */
    @Test
    @Ignore
    public void testJoinExchangeCoordinatorChange_NoMerge_2() throws Exception {
        for (CoordinatorChangeMode mode : CoordinatorChangeMode.values()) {
            exchangeCoordinatorChangeNoMerge(8, true, mode);

            stopAllGrids();
        }
    }

    /**
     * @throws Exception If failed.
     */
    @Test
    public void testFailExchangeCoordinatorChange_NoMerge_1() throws Exception {
        for (CoordinatorChangeMode mode : CoordinatorChangeMode.values()) {
            exchangeCoordinatorChangeNoMerge(5, false, mode);

            stopAllGrids();
        }
    }

    /**
     * @throws Exception If failed.
     */
    @Test
    public void testFailExchangeCoordinatorChange_NoMerge_2() throws Exception {
        for (CoordinatorChangeMode mode : CoordinatorChangeMode.values()) {
            exchangeCoordinatorChangeNoMerge(8, false, mode);

            stopAllGrids();
        }
    }

    /**
     * @throws Exception If failed.
     */
    @Test
    public void testMergeJoinExchangesCoordinatorChange1_4_servers() throws Exception {
        for (CoordinatorChangeMode mode : CoordinatorChangeMode.values()) {
            mergeJoinExchangesCoordinatorChange1(4, mode);

            stopAllGrids();
        }
    }

    /**
     * @throws Exception If failed.
     */
    @Test
    public void testMergeJoinExchangesCoordinatorChange1_8_servers() throws Exception {
        for (CoordinatorChangeMode mode : CoordinatorChangeMode.values()) {
            mergeJoinExchangesCoordinatorChange1(8, mode);

            stopAllGrids();
        }
    }

    /**
     * @param srvs Number of server nodes.
     * @param mode Test mode.
     * @throws Exception If failed.
     */
    private void mergeJoinExchangesCoordinatorChange1(final int srvs, CoordinatorChangeMode mode)
        throws Exception {
        log.info("Test mergeJoinExchangesCoordinatorChange1 [srvs=" + srvs + ", mode=" + mode + ']');

        testSpi = true;

        Ignite srv0 = startGrids(srvs);

        CountDownLatch latch = blockExchangeFinish(srvs, mode);

        IgniteInternalFuture<?> fut = startGridsAsync(srv0, srvs, 2);

        if (latch != null && !latch.await(WAIT_SECONDS, TimeUnit.SECONDS))
            fail("Failed to wait for expected messages.");

        stopGrid(getTestIgniteInstanceName(0), true, false);

        fut.get();

        checkCaches();
    }

    /**
     * @throws Exception If failed.
     */
    @Test
    public void testMergeJoinExchangesCoordinatorChange2_4_servers() throws Exception {
        mergeJoinExchangeCoordinatorChange2(4, 2, F.asList(1, 2, 3, 4), F.asList(5));

        stopAllGrids();

        mergeJoinExchangeCoordinatorChange2(4, 2, F.asList(1, 2, 3, 5), F.asList(4));
    }

    /**
     * @param srvs Number of server nodes.
     * @param startNodes Number of nodes to start.
     * @param blockNodes Nodes which do not receive messages.
     * @param waitMsgNodes Nodes which should receive messages.
     * @throws Exception If failed.
     */
    private void mergeJoinExchangeCoordinatorChange2(final int srvs,
        final int startNodes,
        List<Integer> blockNodes,
        List<Integer> waitMsgNodes
    ) throws Exception {
        testSpi = true;

        Ignite srv0 = startGrids(srvs);

        mergeExchangeWaitVersion(srv0, srvs + startNodes);

        CountDownLatch latch = blockExchangeFinish(srv0, srvs + 1, blockNodes, waitMsgNodes);

        IgniteInternalFuture<?> fut = startGridsAsync(srv0, srvs, startNodes);

        if (latch != null && !latch.await(WAIT_SECONDS, TimeUnit.SECONDS))
            fail("Failed to wait for expected messages.");

        stopGrid(getTestIgniteInstanceName(0), true, false);

        fut.get();

        checkCaches();
    }

    /**
     * @throws Exception If failed.
     */
    @Test
    public void testMergeExchangeCoordinatorChange4() throws Exception {
        testSpi = true;

        final int srvs = 4;

        Ignite srv0 = startGrids(srvs);

        mergeExchangeWaitVersion(srv0, 6);

        final AtomicInteger idx = new AtomicInteger(srvs);

        CountDownLatch latch = blockExchangeFinish(srv0, 5, F.asList(1, 2, 3, 4), F.asList(5));

        IgniteInternalFuture<?> fut = GridTestUtils.runMultiThreadedAsync(new Callable<Void>() {
            @Override public Void call() throws Exception {
                startGrid(idx.getAndIncrement());

                return null;
            }
        }, 2, "start-node");

        if (latch != null && !latch.await(WAIT_SECONDS, TimeUnit.SECONDS))
            fail("Failed to wait for expected messages.");

        stopGrid(getTestIgniteInstanceName(0), true, false);

        fut.get();

        checkCaches();
    }

    /**
     * @param srvs Number of servers.
     * @param join If {@code true} starts new node, otherwise stops node.
     * @param mode Tested scenario.
     * @throws Exception If failed.
     */
    private void exchangeCoordinatorChangeNoMerge(int srvs, final boolean join, CoordinatorChangeMode mode) throws Exception {
        log.info("Test mergeJoinExchangeCoordinatorChange [nodes=" + srvs + ", mode=" + mode + ']');

        testSpi = true;

        final int nodes = srvs;

        startGrids(nodes);

        CountDownLatch latch = blockExchangeFinish(srvs, mode);

        IgniteInternalFuture fut = GridTestUtils.runAsync(new Callable() {
            @Override public Object call() throws Exception {
                if (join)
                    startGrid(nodes);
                else
                    stopGrid(nodes - 1);

                return null;
            }
        });

        waitForExchangeStart(ignite(0), nodes + 1);

        if (latch != null && !latch.await(WAIT_SECONDS, TimeUnit.SECONDS))
            fail("Failed to wait for expected messages.");

        stopGrid(0);

        fut.get();

        checkCaches();
    }

    /**
     * @param srvs Number of server nodes.
     * @param mode Test scenario.
     * @return Awaited state latch.
     * @throws Exception If failed.
     */
    private CountDownLatch blockExchangeFinish(int srvs, CoordinatorChangeMode mode) throws Exception {
        Ignite crd = ignite(0);

        long topVer = srvs + 1;

        switch (mode) {
            case NOBODY_RCVD: {
                blockExchangeFinish(crd, topVer);

                break;
            }

            case NEW_CRD_RCDV: {
                List<Integer> finishNodes = F.asList(1);

                return blockExchangeFinish(crd, topVer, blockNodes(srvs, finishNodes), finishNodes);
            }

            case NON_CRD_RCVD: {
                assert srvs > 2 : srvs;

                List<Integer> finishNodes = F.asList(2);

                return blockExchangeFinish(crd, topVer, blockNodes(srvs, finishNodes), finishNodes);
            }

            default:
                fail();
        }

        return null;
    }

    /**
     * @param srvs Number of servers.
     * @param waitNodes Nodes which should receive message.
     * @return Blocked nodes indexes.
     */
    private List<Integer> blockNodes(int srvs, List<Integer> waitNodes) {
        List<Integer> block = new ArrayList<>();

        for (int i = 0; i < srvs + 1; i++) {
            if (!waitNodes.contains(i))
                block.add(i);
        }

        return block;
    }

    /**
     * @param crd Exchange coordinator.
     * @param topVer Exchange topology version.
     */
    private void blockExchangeFinish(Ignite crd, long topVer) {
        final AffinityTopologyVersion topVer0 = new AffinityTopologyVersion(topVer);

        TestRecordingCommunicationSpi.spi(crd).blockMessages(new IgniteBiPredicate<ClusterNode, Message>() {
            @Override public boolean apply(ClusterNode node, Message msg) {
                if (msg instanceof GridDhtPartitionsFullMessage) {
                    GridDhtPartitionsFullMessage msg0 = (GridDhtPartitionsFullMessage)msg;

                    return msg0.exchangeId() != null && msg0.exchangeId().topologyVersion().equals(topVer0);
                }

                return false;
            }
        });
    }

    /**
     * @param crd Exchange coordinator.
     * @param topVer Exchange topology version.
     * @param blockNodes Nodes which do not receive messages.
     * @param waitMsgNodes Nodes which should receive messages.
     * @return Awaited state latch.
     */
    private CountDownLatch blockExchangeFinish(Ignite crd,
        long topVer,
        final List<Integer> blockNodes,
        final List<Integer> waitMsgNodes
    ) {
        log.info("blockExchangeFinish [crd=" + crd.cluster().localNode().id() +
            ", block=" + blockNodes +
            ", wait=" + waitMsgNodes + ']');

        final AffinityTopologyVersion topVer0 = new AffinityTopologyVersion(topVer);

        final CountDownLatch latch = new CountDownLatch(waitMsgNodes.size());

        TestRecordingCommunicationSpi.spi(crd).blockMessages(new IgniteBiPredicate<ClusterNode, Message>() {
            @Override public boolean apply(ClusterNode node, Message msg) {
                if (msg instanceof GridDhtPartitionsFullMessage) {
                    GridDhtPartitionsFullMessage msg0 = (GridDhtPartitionsFullMessage)msg;

                    if (msg0.exchangeId() == null || msg0.exchangeId().topologyVersion().compareTo(topVer0) < 0)
                        return false;

                    String name = node.attribute(IgniteNodeAttributes.ATTR_IGNITE_INSTANCE_NAME);

                    assert name != null : node;

                    for (Integer idx : blockNodes) {
                        if (name.equals(getTestIgniteInstanceName(idx)))
                            return true;
                    }

                    for (Integer idx : waitMsgNodes) {
                        if (name.equals(getTestIgniteInstanceName(idx))) {
                            log.info("Coordinators sends awaited message [node=" + node.id() + ']');

                            latch.countDown();
                        }
                    }
                }

                return false;
            }
        });

        return latch;
    }

    /**
     * @throws Exception If failed.
     */
    private void checkCaches() throws Exception {
        checkAffinity();

        checkCaches0();

        checkAffinity();

        awaitPartitionMapExchange();

        checkTopologiesConsistency();

        checkCaches0();
    }

    /**
     * @throws Exception If failed.
     */
    private void checkCaches0() throws Exception {
        List<Ignite> nodes = G.allGrids();

        assertTrue(!nodes.isEmpty());

        for (Ignite node : nodes)
            checkNodeCaches(node);
    }

    /**
     * Checks that after exchange all nodes have consistent state about partition owners.
     *
     * @throws Exception If failed.
     */
    private void checkTopologiesConsistency() throws Exception {
        List<Ignite> nodes = G.allGrids();

        IgniteEx crdNode = null;

        for (Ignite node : nodes) {
            ClusterNode locNode = node.cluster().localNode();

            if (crdNode == null || locNode.order() < crdNode.localNode().order())
                crdNode = (IgniteEx)node;
        }

        for (Ignite node : nodes) {
            IgniteEx node0 = (IgniteEx)node;

            if (node0.localNode().id().equals(crdNode.localNode().id()))
                continue;

            for (IgniteInternalCache cache : node0.context().cache().caches()) {
                int partitions = cache.context().affinity().partitions();
                for (int p = 0; p < partitions; p++) {
                    List<ClusterNode> crdOwners = crdNode.cachex(cache.name()).cache().context().topology().owners(p);

                    List<ClusterNode> owners = cache.context().topology().owners(p);

                    assertEquals(crdOwners, owners);
                }
            }
        }
    }

    /**
     */
    private void checkAffinity() {
        List<Ignite> nodes = G.allGrids();

        ClusterNode crdNode = null;

        for (Ignite node : nodes) {
            ClusterNode locNode = node.cluster().localNode();

            if (crdNode == null || locNode.order() < crdNode.order())
                crdNode = locNode;
        }

        AffinityTopologyVersion topVer = ((IgniteEx)grid(crdNode)).
            context().cache().context().exchange().readyAffinityVersion();

        Map<String, List<List<ClusterNode>>> affMap = new HashMap<>();

        for (Ignite node : nodes) {
            IgniteKernal node0 = (IgniteKernal)node;

            for (IgniteInternalCache<?, ?> cache : node0.context().cache().caches()) {
                List<List<ClusterNode>> aff = affMap.get(cache.name());
                List<List<ClusterNode>> aff0 = cache.context().affinity().assignments(topVer);

                if (aff != null)
                    assertEquals(aff, aff0);
                else
                    affMap.put(cache.name(), aff0);
            }
        }
    }

    /**
     * @param node Node.
     * @param startKey Start key.
     * @param keyRange Keys range.
     */
    private void checkNodeCaches(Ignite node, int startKey, int keyRange) {
        ThreadLocalRandom rnd = ThreadLocalRandom.current();

        for (String cacheName : cacheNames) {
            String err = "Invalid value [node=" + node.name() +
                ", client=" + node.configuration().isClientMode() +
                ", order=" + node.cluster().localNode().order() +
                ", cache=" + cacheName + ']';

            IgniteCache<Object, Object> cache = node.cache(cacheName);

            for (int i = 0; i < 10; i++) {
                Integer key = rnd.nextInt(keyRange) + startKey;

                cache.put(key, i);

                Object val = cache.get(key);

                assertEquals(err, i, val);
            }
        }
    }

    /**
     * @param node Node.
     * @throws Exception If failed.
     */
    private void checkNodeCaches(final Ignite node) throws Exception {
        log.info("Check node caches [node=" + node.name() + ']');

        List<Future<?>> futs = new ArrayList<>();

        for (final String cacheName : cacheNames) {
            final IgniteCache<Object, Object> cache = node.cache(cacheName);

            futs.add(executor.submit(new Runnable() {
                @Override public void run() {
                    ThreadLocalRandom rnd = ThreadLocalRandom.current();

                    CacheConfiguration cCfg = cache.getConfiguration(CacheConfiguration.class);

                    boolean isTxCacheWithouBackups = cCfg.getCacheMode() == PARTITIONED &&
                        cCfg.getAtomicityMode() == TRANSACTIONAL &&
                        cCfg.getBackups() == 0;

                    assertNotNull("No cache [node=" + node.name() +
                        ", client=" + node.configuration().isClientMode() +
                        ", order=" + node.cluster().localNode().order() +
                        ", cache=" + cacheName + ']', cache);

                    String err = "Invalid value [node=" + node.name() +
                        ", client=" + node.configuration().isClientMode() +
                        ", order=" + node.cluster().localNode().order() +
                        ", cache=" + cacheName + ']';

                    for (int i = 0; i < 5; i++) {
                        Integer key = rnd.nextInt(20_000);

                        cache.put(key, i);

                        Object val = cache.get(key);

                        if (isTxCacheWithouBackups)
                            assertTrue(err, val == null || val.equals(i));
                        else
                            assertEquals(err, i, val);
                    }

                    for (int i = 0; i < 5; i++) {
                        Map<Integer, Integer> map = new TreeMap<>();

                        for (int j = 0; j < 10; j++) {
                            Integer key = rnd.nextInt(20_000);

                            map.put(key, i);
                        }

                        cache.putAll(map);

                        Map<Object, Object> res = cache.getAll(map.keySet());

                        for (Map.Entry<Integer, Integer> e : map.entrySet()) {
                            if (isTxCacheWithouBackups)
                                assertTrue(err, res.get(e.getKey()) == null || e.getValue().equals(res.get(e.getKey())));
                            else
                                assertEquals(err, e.getValue(), res.get(e.getKey()));
                        }
                    }

                    if (atomicityMode(cache) == TRANSACTIONAL) {
                        for (TransactionConcurrency concurrency : TransactionConcurrency.values()) {
                            for (TransactionIsolation isolation : TransactionIsolation.values())
                                checkNodeCaches(err, node, cache, concurrency, isolation);
                        }
                    }
                }
            }));
        }

        for (Future<?> fut : futs)
            fut.get();
    }

    /**
     * @param err Error message.
     * @param node Node.
     * @param cache Cache.
     * @param concurrency Transaction concurrency.
     * @param isolation Transaction isolation.
     */
    private void checkNodeCaches(
        String err,
        Ignite node,
        IgniteCache<Object, Object> cache,
        TransactionConcurrency concurrency,
        TransactionIsolation isolation) {
        ThreadLocalRandom rnd = ThreadLocalRandom.current();

        Map<Object, Object> map = new HashMap<>();

        try {
            try (Transaction tx = node.transactions().txStart(concurrency, isolation)) {
                for (int i = 0; i < 5; i++) {
                    Integer key = rnd.nextInt(20_000);

                    cache.put(key, i);

                    Object val = cache.get(key);

                    assertEquals(i, val);

                    map.put(key, val);
                }

                tx.commit();
            }
        }
        catch (ClusterTopologyException e) {
            info("Tx failed, ignore: " + e);

            return;
        }

        for (Map.Entry<Object, Object> e : map.entrySet())
            assertEquals(err, e.getValue(), cache.get(e.getKey()));
    }

    /**
     * @param node Node.
     * @param vers Expected exchange versions.
     */
    private void checkExchanges(Ignite node, long... vers) {
        IgniteKernal node0 = (IgniteKernal)node;

        List<AffinityTopologyVersion> expVers = new ArrayList<>();

        for (long ver : vers)
            expVers.add(new AffinityTopologyVersion(ver));

        List<AffinityTopologyVersion> doneVers = new ArrayList<>();

        List<GridDhtPartitionsExchangeFuture> futs =
            node0.context().cache().context().exchange().exchangeFutures();

        for (int i = futs.size() - 1; i >= 0; i--) {
            GridDhtPartitionsExchangeFuture fut = futs.get(i);

            if (!fut.isMerged() && fut.exchangeDone() && fut.firstEvent().type() != EVT_DISCOVERY_CUSTOM_EVT) {
                AffinityTopologyVersion resVer = fut.topologyVersion();

                Assert.assertNotNull(resVer);

                doneVers.add(resVer);
            }
        }

        assertEquals(expVers, doneVers);

        for (CacheGroupContext grpCtx : node0.context().cache().cacheGroups()) {
            for (AffinityTopologyVersion ver : grpCtx.affinity().cachedVersions()) {
                if (ver.minorTopologyVersion() > 0)
                    continue;

                assertTrue("Unexpected version [ver=" + ver + ", exp=" + expVers + ']',
                    expVers.contains(ver));
            }
        }
    }

    /**
     * @param node Node.
     * @param topVer Exchange version.
     * @throws Exception If failed.
     */
    private void waitForExchangeStart(final Ignite node, final long topVer) throws Exception {
        final GridCachePartitionExchangeManager exch = ((IgniteKernal)node).context().cache().context().exchange();

        boolean wait = GridTestUtils.waitForCondition(new PA() {
            @Override public boolean apply() {
                return exch.lastTopologyFuture().initialVersion().topologyVersion() >= topVer;
            }
        }, 15_000);

        assertTrue(wait);
    }

    /**
     * Sequentially starts nodes so that node name is consistent with node order.
     *
     * @param node Some existing node.
     * @param startIdx Start node index.
     * @param cnt Number of nodes.
     * @return Start future.
     * @throws Exception If failed.
     */
    private IgniteInternalFuture startGridsAsync(Ignite node, int startIdx, int cnt) throws Exception {
        GridCompoundFuture fut = new GridCompoundFuture();

        for (int i = 0; i < cnt; i++) {
            final CountDownLatch latch = new CountDownLatch(1);

            node.events().localListen(new IgnitePredicate<Event>() {
                @Override public boolean apply(Event evt) {
                    log.info("Got event: " + ((DiscoveryEvent)evt).eventNode().id());

                    latch.countDown();

                    return false;
                }
            }, EventType.EVT_NODE_JOINED);

            final int nodeIdx = startIdx + i;

            IgniteInternalFuture fut0 = GridTestUtils.runAsync(new Callable() {
                @Override public Object call() throws Exception {
                    log.info("Start new node: " + nodeIdx);

                    startGrid(nodeIdx);

                    return null;
                }
            }, "start-node-" + nodeIdx);

            if (!latch.await(WAIT_SECONDS, TimeUnit.SECONDS))
                fail();

            fut.add(fut0);
        }

        fut.markInitialized();

        return fut;
    }

    /**
     *
     */
    enum CoordinatorChangeMode {
        /**
         * Coordinator failed, did not send full message.
         */
        NOBODY_RCVD,

        /**
         * Coordinator failed, but new coordinator received full message
         * and finished exchange.
         */
        NEW_CRD_RCDV,

        /**
         * Coordinator failed, but one of servers (not new coordinator) received full message.
         */
        NON_CRD_RCVD
    }

    /**
     *
     */

    static class TestDelayExchangeMessagesSpi extends TestDelayingCommunicationSpi {
        /** {@inheritDoc} */
        @Override protected boolean delayMessage(Message msg, GridIoMessage ioMsg) {
            return delay(msg);
        }
    }

    /**
     * Return {@code true} if need to delay message to emulate merge exchanges.
     *
     * @param msg Message.
     * @return {@code True} if need to delay message.
     */
    private static boolean delay(Message msg) {
        if (!GridDhtPartitionsAbstractMessage.class.isInstance(msg))
            return false;

        GridDhtPartitionsAbstractMessage dhtMsg = (GridDhtPartitionsAbstractMessage)msg;
        return nonNull(dhtMsg.exchangeId()) || GridDhtPartitionsSingleRequest.class.isInstance(dhtMsg);
    }

    /**
     * Test checks that there will be no {@link ConcurrentModificationException}
     * when merging exchanges and iterating over {@link ExchangeDiscoveryEvents#events} at the same time.
     *
     * @throws Exception If failed.
     */
    @Test
    public void testNoConcurrentModificationExceptionAfterMergeExchanges() throws Exception {
        testSpi = true;

        LogListener logLsnr = matches("Merge exchange future on finish [").build();
        listeningLog.registerAllListeners(logLsnr);

        AtomicBoolean stop = new AtomicBoolean();
        Collection<Exception> exceptions = new ConcurrentLinkedQueue<>();

        try {
            startGrid(0);

            for (int i = 1; i < 9; i++) {
                IgniteConfiguration cfg = getConfiguration(getTestIgniteInstanceName(i));
                TestRecordingCommunicationSpi spi = ((TestRecordingCommunicationSpi)cfg.getCommunicationSpi());

                spi.blockMessages((node, msg) -> delay(msg));
                runAsync(() -> startGrid(cfg), "create-node-" + cfg.getIgniteInstanceName());
                spi.waitForBlocked();
            }

            List<Ignite> allNodes = IgnitionEx.allGridsx();
            CountDownLatch latch = new CountDownLatch(allNodes.size());

            for (Ignite gridEx : allNodes) {
                runAsync(() -> {
                    Collection<DiscoveryEvent> evts = ((IgniteEx)gridEx).context().cache().context().exchange()
                        .lastTopologyFuture().events().events();

                    latch.countDown();

                    int i = 0;
                    while (!stop.get()) {
                        try {
                            for (DiscoveryEvent evt : evts) {
                                if (nonNull(evt))
                                    i++;
                            }
                        }
                        catch (ConcurrentModificationException e) {
                            exceptions.add(e);

                            log.error("i = " + i, e);

                            break;
                        }
                    }
                }, "get-ex-" + gridEx.configuration().getIgniteInstanceName());
            }

            for (Ignite node : allNodes)
                TestRecordingCommunicationSpi.spi(node).stopBlock();

            latch.await();
            awaitPartitionMapExchange();
        }
        finally {
            stop.set(true);
        }

        assertTrue(logLsnr.check());
        assertTrue(exceptions.isEmpty());
    }
}
